Solutions/ESET Protect Platform/Data Connectors/integration/models.py (86 lines of code) (raw):

import logging import os import typing as t from dataclasses import dataclass, field from datetime import datetime, timedelta, timezone from importlib import resources import yaml @dataclass class TokenStorage: __access_token: str | None = field(default=None, init=False) __refresh_token: str | None = field(default=None, init=False) __expiration_time: datetime | None = field(default=None, init=False) @property def access_token(self) -> str | None: return self.__access_token @access_token.setter def access_token(self, value: str) -> None: self.__access_token = value @property def refresh_token(self) -> str | None: return self.__refresh_token @refresh_token.setter def refresh_token(self, value: str) -> None: self.__refresh_token = value @property def expiration_time(self) -> datetime | None: return self.__expiration_time @expiration_time.setter def expiration_time(self, value: datetime) -> None: self.__expiration_time = value def to_dict(self) -> dict[str, t.Any]: return { "access_token": self.access_token, "refresh_token": self.refresh_token, "expiration_time": self.expiration_time, } class Config: def __init__(self) -> None: config = self.get_config_params() if config: self.max_retries: int = config.get("max_retries") # type: ignore self.retry_delay: float = float(config.get("retry_delay")) # type: ignore self.requests_timeout = config.get("requests_timeout") self.buffer: int = config.get("buffer") # type: ignore self.data_sources: dict[str, t.Any] = config.get("data_sources") # type: ignore self.version: str = config.get("version") # type: ignore def get_config_params(self) -> dict[str, t.Any] | t.Any: try: return yaml.safe_load( resources.files(__package__ or "integration").parent.joinpath("config.yml").read_bytes() # type: ignore ) except FileNotFoundError as e: logging.error(e) raise FileNotFoundError("The config file is not found. Further processing is impossible.") class EnvVariables: def __init__(self) -> None: self.__username: str | None = os.getenv("USERNAME_INTEGRATION") self.__password: str | None = os.getenv("PASSWORD_INTEGRATION") self.interval: int = int(os.getenv("INTERVAL", 5)) self.last_detection_time: str = os.getenv( "LAST_DETECTION", (datetime.now(timezone.utc) - timedelta(seconds=self.interval * 60)).strftime("%Y-%m-%dT%H:%M:%SZ"), ) self.endpoint_uri: str = os.getenv("ENDPOINT_URI", "") self.dcr_immutableid: str = os.getenv("DCR_IMMUTABLEID", "") self.stream_name: str = os.getenv("STREAM_NAME", "") self.ep_instance: str = os.getenv("EP_INSTANCE", "") self.ei_instance: str = os.getenv("EI_INSTANCE", "") self.ecos_instance: str = os.getenv("ECOS_INSTANCE", "") self.__conn_str: str = os.getenv("WEBSITE_CONTENTAZUREFILECONNECTIONSTRING", "") self.__key_base64: str = os.getenv("KEY_BASE64", "") region = os.getenv("INSTANCE_REGION", "eu") self.oauth_url: str = f"https://{region}.business-account.iam.eset.systems" self.detections_url: str = f"https://{region}.incident-management.eset.systems" @property def username(self) -> str | None: return self.__username @property def password(self) -> str | None: return self.__password @property def conn_str(self) -> str: return self.__conn_str @property def key_base64(self) -> str: return self.__key_base64